{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Train and deploy on Kubeflow from Notebooks\n",
    "\n",
    "This notebook introduces you to using Kubeflow Fairing to train and deploy a model to Kubeflow on multiple backends such as Google Kubernetes Engine (GKE), Google Cloud ML Engine and Azure Kubernetes Service (AKS). This notebook demonstrate how to:\n",
    " \n",
    "* Train an XGBoost model in a local notebook,\n",
    "* Use Kubeflow Fairing to train an XGBoost model remotely on Kubeflow using some of the supported backends,\n",
    "* Use Kubeflow Fairing to deploy a trained model to Kubeflow, and\n",
    "* Call the deployed endpoint for predictions.\n",
    "\n",
    "To learn more about how to run this notebook locally, see the guide to [training and deploying on GCP from a local notebook][gcp-local-notebook].\n",
    "\n",
    "[gcp-local-notebook]: https://kubeflow.org/docs/fairing/gcp-local-notebook/"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Set up your notebook for training an XGBoost model\n",
    "\n",
    "Import the libraries required to train this model."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install --user -r requirements.txt"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import argparse\n",
    "import logging\n",
    "import joblib\n",
    "import sys\n",
    "import pandas as pd\n",
    "from sklearn.metrics import mean_absolute_error\n",
    "from sklearn.model_selection import train_test_split\n",
    "from sklearn.impute import SimpleImputer\n",
    "from xgboost import XGBRegressor"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "logging.basicConfig(format='%(message)s')\n",
    "logging.getLogger().setLevel(logging.INFO)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Define a function to split the input file into training and testing datasets."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def read_input(file_name, test_size=0.25):\n",
    "    \"\"\"Read input data and split it into train and test.\"\"\"\n",
    "    data = pd.read_csv(file_name)\n",
    "    data.dropna(axis=0, subset=['SalePrice'], inplace=True)\n",
    "\n",
    "    y = data.SalePrice\n",
    "    X = data.drop(['SalePrice'], axis=1).select_dtypes(exclude=['object'])\n",
    "\n",
    "    train_X, test_X, train_y, test_y = train_test_split(X.values,\n",
    "                                                      y.values,\n",
    "                                                      test_size=test_size,\n",
    "                                                      shuffle=False)\n",
    "\n",
    "    imputer = SimpleImputer()\n",
    "    train_X = imputer.fit_transform(train_X)\n",
    "    test_X = imputer.transform(test_X)\n",
    "\n",
    "    return (train_X, train_y), (test_X, test_y)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Define functions to train, evaluate, and save the trained model."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def train_model(train_X,\n",
    "                train_y,\n",
    "                test_X,\n",
    "                test_y,\n",
    "                n_estimators,\n",
    "                learning_rate):\n",
    "    \"\"\"Train the model using XGBRegressor.\"\"\"\n",
    "    model = XGBRegressor(n_estimators=n_estimators, learning_rate=learning_rate)\n",
    "\n",
    "    model.fit(train_X,\n",
    "            train_y,\n",
    "            early_stopping_rounds=40,\n",
    "            eval_set=[(test_X, test_y)])\n",
    "\n",
    "    print(\"Best RMSE on eval: %.2f with %d rounds\" %\n",
    "               (model.best_score,\n",
    "                model.best_iteration+1))\n",
    "    return model\n",
    "\n",
    "def eval_model(model, test_X, test_y):\n",
    "    \"\"\"Evaluate the model performance.\"\"\"\n",
    "    predictions = model.predict(test_X)\n",
    "    logging.info(\"mean_absolute_error=%.2f\", mean_absolute_error(predictions, test_y))\n",
    "\n",
    "def save_model(model, model_file):\n",
    "    \"\"\"Save XGBoost model for serving.\"\"\"\n",
    "    joblib.dump(model, model_file)\n",
    "    logging.info(\"Model export success: %s\", model_file)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Define a class for your model, with methods for training and prediction."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class HousingServe(object):\n",
    "    \n",
    "    def __init__(self):\n",
    "        self.train_input = \"ames_dataset/train.csv\"\n",
    "        self.n_estimators = 50\n",
    "        self.learning_rate = 0.1\n",
    "        self.model_file = \"trained_ames_model.dat\"\n",
    "        self.model = None\n",
    "\n",
    "    def train(self):\n",
    "        (train_X, train_y), (test_X, test_y) = read_input(self.train_input)\n",
    "        model = train_model(train_X,\n",
    "                          train_y,\n",
    "                          test_X,\n",
    "                          test_y,\n",
    "                          self.n_estimators,\n",
    "                          self.learning_rate)\n",
    "\n",
    "        eval_model(model, test_X, test_y)\n",
    "        save_model(model, self.model_file)\n",
    "\n",
    "    def predict(self, X, feature_names=None):\n",
    "        \"\"\"Predict using the model for given ndarray.\"\"\"\n",
    "        if not self.model:\n",
    "            self.model = joblib.load(self.model_file)\n",
    "        # Do any preprocessing\n",
    "        prediction = self.model.predict(data=X)\n",
    "        # Do any postprocessing\n",
    "        return prediction"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Train an XGBoost model in a notebook\n",
    "\n",
    "Call `HousingServe().train()` to train your model, and then evaluate and save your trained model."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "HousingServe().train()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Set up Kubeflow Fairing for training and predictions\n",
    "\n",
    "Import the `fairing` library and configure the backend environment that your training or prediction job will run in."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "tags": [
     "parameters"
    ]
   },
   "outputs": [],
   "source": [
    "from kubeflow import fairing\n",
    "\n",
    "# Default to using the GKE backend.\n",
    "# Change to `KubeflowAzureBackend` or `KubeflowAWSBackend`,etc if you are on other cloud providers.\n",
    "# Appropriately modify settings below for the corresponding backend.\n",
    "FAIRING_BACKEND = 'KubeflowGKEBackend'\n",
    "\n",
    "# If using KubeflowAzureBackend and unless passed to notebook as parameters,\n",
    "# replace below with your configuration (remove < and > characters).\n",
    "# DOCKER_REGISTRY = '<your_registry>.azurecr.io'\n",
    "# AZURE_REGION = '<your_region>'\n",
    "# AZURE_RESOURCE_GROUP = '<your_resource_group>'\n",
    "# AZURE_STORAGE_ACCOUNT = '<your_storage_account>'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import importlib\n",
    "\n",
    "# The logic in this cell demonstrates what to change\n",
    "# if wanting to use a different backend such as Cloud ML Engine (GCPManagedBackend),\n",
    "# AKS (KubeflowAzureBackend) or EKS (KubeflowAWSBackend). This cell is also parametrized\n",
    "# so that it allows programmatic execution of the notebook using different backends.\n",
    "\n",
    "if FAIRING_BACKEND in ['KubeflowGKEBackend', 'GCPManagedBackend']:\n",
    "    # Setting up google container repositories (GCR) for storing output containers\n",
    "    GCP_PROJECT = fairing.cloud.gcp.guess_project_name()\n",
    "    DOCKER_REGISTRY = 'gcr.io/{}/fairing-job'.format(GCP_PROJECT)\n",
    "    BuildContext = None\n",
    "if FAIRING_BACKEND == 'KubeflowAzureBackend':\n",
    "    from kubeflow.fairing.builders.cluster.azurestorage_context import StorageContextSource\n",
    "    BuildContext = StorageContextSource(\n",
    "        region=AZURE_REGION, resource_group_name=AZURE_RESOURCE_GROUP,\n",
    "        storage_account_name=AZURE_STORAGE_ACCOUNT\n",
    "    )\n",
    "if FAIRING_BACKEND == 'KubeflowAWSBackend':\n",
    "    from kubeflow.fairing.builders.cluster.s3_context import S3ContextSource\n",
    "    # Change the configuration and use your own region, buckets.\n",
    "    AWS_ACCOUNT_ID = fairing.cloud.aws.guess_account_id()\n",
    "    AWS_REGION = 'us-west-2'\n",
    "    DOCKER_REGISTRY = '{}.dkr.ecr.{}.amazonaws.com'.format(AWS_ACCOUNT_ID, AWS_REGION)\n",
    "    S3_BUCKET = 'kubeflow-fairing-data'\n",
    "    \n",
    "    BuildContext = S3ContextSource(\n",
    "        aws_account=AWS_ACCOUNT_ID, region=AWS_REGION,\n",
    "        bucket_name=S3_BUCKET\n",
    "    )\n",
    "    \n",
    "BackendClass = getattr(importlib.import_module('kubeflow.fairing.backends'), FAIRING_BACKEND)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Train an XGBoost model remotely on Kubeflow\n",
    "\n",
    "Import the `TrainJob` and use the configured backend class. Kubeflow Fairing packages the `HousingServe` class, the training data, and the training job's software prerequisites as a Docker image. Then Kubeflow Fairing deploys and runs the training job on Kubeflow."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": false
   },
   "outputs": [],
   "source": [
    "from kubeflow.fairing import TrainJob\n",
    "train_job = TrainJob(HousingServe, input_files=['ames_dataset/train.csv', \"requirements.txt\"],\n",
    "                     docker_registry=DOCKER_REGISTRY,\n",
    "                     backend=BackendClass(build_context_source=BuildContext))\n",
    "train_job.submit()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Deploy the trained model to Kubeflow for predictions\n",
    "\n",
    "Import the `PredictionEndpoint` and use the configured backend class. Kubeflow Fairing packages the `HousingServe` class, the trained model, and the prediction endpoint's software prerequisites as a Docker image. Then Kubeflow Fairing deploys and runs the prediction endpoint on Kubeflow."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": false
   },
   "outputs": [],
   "source": [
    "from kubeflow.fairing import PredictionEndpoint\n",
    "endpoint = PredictionEndpoint(HousingServe, input_files=['trained_ames_model.dat', \"requirements.txt\"],\n",
    "                              service_type='LoadBalancer',\n",
    "                              docker_registry=DOCKER_REGISTRY,\n",
    "                              backend=BackendClass(build_context_source=BuildContext))\n",
    "endpoint.create()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Call the prediction endpoint\n",
    "\n",
    "Create a test dataset, then call the endpoint on Kubeflow for predictions."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "(train_X, train_y), (test_X, test_y) = read_input(\"ames_dataset/train.csv\")\n",
    "endpoint.predict_nparray(test_X)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Clean up the prediction endpoint\n",
    "\n",
    "Delete the prediction endpoint created by this notebook."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "endpoint.delete()"
   ]
  }
 ],
 "metadata": {
  "celltoolbar": "Tags",
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
